import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';
import '../utils.dart';

Stream<int> _getStream() {
  final controller = StreamController<int>();

  Timer(const Duration(milliseconds: 100), () => controller.add(1));
  Timer(const Duration(milliseconds: 200), () => controller.add(2));
  Timer(const Duration(milliseconds: 300), () => controller.add(3));
  Timer(const Duration(milliseconds: 400), () {
    controller.add(4);
    controller.close();
  });

  return controller.stream;
}

Stream<int> _getOtherStream() {
  final controller = StreamController<int>();

  Timer(const Duration(milliseconds: 250), () {
    controller.add(1);
    controller.close();
  });

  return controller.stream;
}

class TakeTestPage extends TestPage {
  TakeTestPage(super.title) {
    test('Rx.takeLast', () async {
      final stream = Stream.fromIterable([1, 2, 3, 4, 5]).takeLast(3);
      expect(stream);
    });

    test('Rx.takeLast.zero', () async {
      var count = 0;
      final values = [1, 2, 3, 4, 5];
      final stream = Stream.fromIterable(values).doOnData((_) => count++).takeLast(0);
      expect(stream,);
      expect(count);
    });

    test('Rx.takeLast.emitsError', () async {
      final stream = Stream<int>.error(Exception()).takeLast(3);
      expect(stream);
    });

    test('Rx.takeLast.countCantBeNegative', () async {
      Stream<int> stream() => Stream.fromIterable([1, 2, 3, 4, 5]).takeLast(-1);
      expect(stream);
    });

    test('Rx.takeLast.reusable', () async {
      final transformer = TakeLastStreamTransformer<int>(3);
      Stream<int> stream() => Stream.fromIterable([1, 2, 3, 4, 5]).takeLast(3);

      stream().transform(transformer).listen(((result) {
        expect(result);
      }));

      stream().transform(transformer).listen(((result) {
        expect(result);
      }));
    });

    test('Rx.takeLast.asBroadcastStream', () async {
      final stream =
      Stream.fromIterable([1, 2, 3, 4, 5]).takeLast(3).asBroadcastStream();

      stream.listen(null);
      stream.listen(null);

      expect(stream.isBroadcast);
    });

    test('Rx.takeLast.pause.resume', () async {
      late StreamSubscription<num> subscription;

      subscription = Stream.fromIterable([1, 2, 3, 4, 5])
          .takeLast(3)
          .listen(((data) {
        expect(data);
        subscription.cancel();
      }));

      subscription.pause();
      subscription.resume();
    });

    test('Rx.takeLast.singleSubscription', () async {
      final controller = StreamController<int>();

      final stream = controller.stream.takeLast(3);

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test('Rx.takeLast.cancel', () {
      final subscription =
      Stream.fromIterable([1, 2, 3, 4, 5]).takeLast(3).listen(null);
      subscription.onData(
              (event) {
            subscription.cancel();
            expect(event);
          }
      );
    });

    test('Rx.takeLast.nullable', () {
      nullableTest<String?>(
            (s) => s.takeLast(1),
      );
    });

    test('Rx.takeUntil', () async {
      const expectedOutput = [1, 2];
      var count = 0;

      _getStream().takeUntil(_getOtherStream()).listen(((result) {
        expect(expectedOutput[count++]);
      }));
    });

    test('Rx.takeUntil.shouldClose', () async {
      _getStream()
          .takeUntil(Stream<void>.empty())
          .listen(null, onDone: (() => expect(true)));
    });

    test('Rx.takeUntil.reusable', () async {
      final transformer = TakeUntilStreamTransformer<int, int>(
          _getOtherStream().asBroadcastStream());
      const expectedOutput = [1, 2];
      var countA = 0, countB = 0;

      _getStream().transform(transformer).listen(((result) {
        expect(expectedOutput[countA++]);
      }));

      _getStream().transform(transformer).listen(((result) {
        expect(expectedOutput[countB++]);
      }));
    });

    test('Rx.takeUntil.asBroadcastStream', () async {
      final stream = _getStream()
          .asBroadcastStream()
          .takeUntil(_getOtherStream().asBroadcastStream());


      stream.listen(null);
      stream.listen(null);

      expect(true);
    });

    test('Rx.takeUntil.error.shouldThrowA', () async {
      final streamWithError =
      Stream<void>.error(Exception()).takeUntil(_getOtherStream());

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.takeUntil.pause.resume', () async {
      late StreamSubscription<int> subscription;
      const expectedOutput = [1, 2];
      var count = 0;

      subscription =
          _getStream().takeUntil(_getOtherStream()).listen(((result) {
            expect(result);

            if (count == expectedOutput.length) {
              subscription.cancel();
            }
          }));

      subscription.pause();
      subscription.resume();
    });

    test('Rx.takeUntil accidental broadcast', () async {
      final controller = StreamController<int>();

      final stream = controller.stream.takeUntil(Stream<int>.empty());

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test('Rx.takeUntil.nullable', () {
      nullableTest<String?>(
            (s) => s.takeUntil(Stream<void>.empty()),
      );
    });

    test('Rx.takeWhileInclusive', () async {
      final stream = Stream.fromIterable([2, 3, 4, 5, 6, 1, 2, 3])
          .takeWhileInclusive((i) => i < 4);
      expect(stream,);
    });

    test('Rx.takeWhileInclusive.shouldClose', () async {
      final stream =
      Stream.fromIterable([2, 3, 4, 5, 6, 1, 2, 3]).takeWhileInclusive((i) {
        if (i == 4) {
          throw Exception();
        } else {
          return true;
        }
      });
      expect(stream);
    });

    test('Rx.takeWhileInclusive.asBroadcastStream', () async {
      final stream = Stream.fromIterable([2, 3, 4, 5, 6])
          .takeWhileInclusive((i) => i < 4)
          .asBroadcastStream();

      stream.listen(null);
      stream.listen(null);

      expect(true);
    });

    test('Rx.takeWhileInclusive.shouldThrowB', () async {
      final stream = Stream<void>.error(Exception()).takeWhileInclusive((_) => true);
      expect(stream);
    });

    test('Rx.takeWhileInclusive.pause.resume', () async {
      late StreamSubscription<num> subscription;

      subscription = Stream.fromIterable([2, 3, 4, 5, 6])
          .takeWhileInclusive((i) => i < 4)
          .listen(((data) {
        expect(data);
        subscription.cancel();
      }));

      subscription.pause();
      subscription.resume();
    });

    test('Rx.takeWhileInclusive accidental broadcast', () async {
      final controller = StreamController<int>();

      final stream = controller.stream.takeWhileInclusive((_) => true);

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test('Rx.takeWhileInclusive.nullable', () {
      nullableTest<String?>(
            (s) => s.takeWhileInclusive((_) => true),
      );
    });

  }

}